package com.kool.kmqtt.server.processer;

import com.kool.kmqtt.server.PacketSender;
import com.kool.kmqtt.server.ServerConfig;
import com.kool.kmqtt.server.encoder.PublishPacketEncoder;
import com.kool.kmqtt.server.packet.Packet;
import com.kool.kmqtt.server.packet.PacketIdGenerator;
import com.kool.kmqtt.server.packet.PublishVariableHeader;
import com.kool.kmqtt.server.repository.Repository;
import com.kool.kmqtt.server.repository.RepositoryFactory;
import com.kool.kmqtt.server.repository.subscription.Subscriber;
import com.kool.kmqtt.server.session.SessionContext;
import com.kool.kmqtt.server.session.SessionHolder;
import com.kool.kmqtt.service.KauthService;
import com.kool.kmqtt.service.request.TopicAuthReq;
import com.kool.kmqtt.service.vo.TopicAuthResp;
import com.kool.kmqtt.util.SpringUtil;
import lombok.extern.slf4j.Slf4j;

import java.util.List;

/**
 * @author luyu
 */
@Slf4j
public class PublishUtil {


    /**
     * 分发PUBLISH报文
     *
     * @param packet
     */
    public static void sendPublishToSubscribers(Packet packet) {
        Repository repository = RepositoryFactory.getRepository();
        String topicName = ((PublishVariableHeader) packet.getVariableHeader()).getTopicName();
        //查询与主题匹配的主题过滤器的订阅者
        List<Subscriber> subscribers = repository.getSubscriber(topicName);
        if (subscribers != null) {
            for (Subscriber subscriber : subscribers) {
                String subClientId = subscriber.getClientId();
                int subQos = subscriber.getQos() < ServerConfig.getInstance().getQos() ? subscriber.getQos() : ServerConfig.getInstance().getQos();
                //获取会话上下文
                SessionContext subSessionContext = SessionHolder.getInstance().getByClientId(subClientId);
                //发送PUBLISH报文给会话上下文存在的订阅者
                if (subSessionContext != null) {
                    packet.getFixedHeader().setQoS(subQos);
                    //当一个PUBLISH报文发送给客户端是因为匹配一个已建立的订阅时，服务端必须将保留标志设为0
                    packet.getFixedHeader().setRetain(false);
                    //发送PUBLISH
                    sendPublish(packet, subQos, subSessionContext);
                }
            }
        }
    }

    /**
     * 发送PUBLISH
     *
     * @param packet
     * @param qos
     * @param sessionContext
     */
    public static void sendPublish(Packet packet, int qos, SessionContext sessionContext) {
        //验证主题权限
        if (ServerConfig.getInstance().getTopicAuthSwitch()) {
            String userName = sessionContext.getUserName();
            PublishVariableHeader publishVariableHeader = (PublishVariableHeader) packet.getVariableHeader();
            String topicName = publishVariableHeader.getTopicName();

            TopicAuthReq request = new TopicAuthReq();
            request.setUserName(userName);
            request.setTopicName(topicName);
            request.setAction("publish");

            KauthService kauthService = SpringUtil.getBean(KauthService.class);
            TopicAuthResp resp = kauthService.topicAuth(request);
            if (resp.getIsSuccess() != 1) {
                log.info("主题权限验证不通过，不发布：用户名={}，主题名={}", userName, topicName);
                //不发送
                return;
            }
        }

        Repository repository = RepositoryFactory.getRepository();

        String clientId = sessionContext.getClientId();
        if (qos == 1 || qos == 2) {
            //使用1个报文id
            int sendPacketId = PacketIdGenerator.generateId(clientId);
            ((PublishVariableHeader) packet.getVariableHeader()).setPacketId(sendPacketId);
            packet.setPacketId(sendPacketId);
            packet.setSendTime(System.currentTimeMillis());
            //保存出站QoS=1、2的消息到出站未确认PUBLISH消息
            repository.saveSendPackets(clientId, packet);
        }

        //发送
        PacketSender packetSender = new PacketSender(sessionContext, new PublishPacketEncoder());
        packetSender.send(packet);

    }
}
